#include "config.h"

#include <sys/statvfs.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/statfs.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "cluster.h"
#include "squeue.h"
#include "cache.h"
#include "core.h"
#include "net_global.h"
#include "lich_md.h"
#include "disk.h"
#include "../replica/replica.h"
#include "../controller/md_proto.h"
#include "chunk_proto.h"
#include "chunk_bh.h"
#include "chunk_ops.h"
#include "chunk.h"
#include "chunk_cleanup.h"
#include "longtask.h"
#include "job_dock.h"
#include "dbg.h"

#define __CHUNK_ANALYSIS_UPDATE__ (INT32_MAX / 2)
#define __CHUNK_ANALYSIS_HIT__ (HSM_RESET * 10)

/**
 * magic由controller端生成，嵌入IO中，用于维护会话一致性，不需要持久化
 *
 */

int chunk_magic()
{
        uint32_t magic;

retry:
        magic = fastrandom();
        if (magic == 0) {
                DWARN("regenerate\n");
                goto retry;
        }

        return magic;
}

STATIC int __chunk_proto_connect__(const nid_t *nid, const chkid_t *chkid,
                                   const chkid_t *parent, clockstat_t *clockstat,
                                   const lease_token_t *token, uint32_t magic, int force)
{
        int ret;

        if (net_islocal(nid)) {
                ret = replica_srv_connect(nid, chkid, parent, token, magic, clockstat, force);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                goto err_ret;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
        } else {
                ret = replica_rpc_connect(nid, chkid, parent, token, magic, clockstat, force);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                goto err_ret;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}


int chunk_connect(const nid_t *nid, const chkid_t *chkid, const chkid_t *parent,
                  int force, const lease_token_t *token, clockstat_t *clockstat, repstat_t *repstat)
{
        int ret;
        uint32_t magic;
        time_t ltime;

        YASSERT(force == 0 || force == 1);
        
        ANALYSIS_BEGIN(0);
        
        ret = network_connect(nid, &ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        magic = chunk_magic();
        ret = __chunk_proto_connect__(nid, chkid, parent, clockstat, token, magic, force);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DERROR("connect chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                               CHKID_ARG(chkid), network_rname(nid), ret, strerror(ret));
                        SWARN(0, "%s connect chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                              M_DATA_CHUNK_WARN, CHKID_ARG(chkid),
                              network_rname(nid), ret, strerror(ret));

                        /* not allowed this happen for now */
                        //YASSERT(0);

                        if (gloconf.nodata_unsafe) {
                                ret = EIO;
                        } else {
                                ret = ENODATA;
                        }
                } else {
#if ENABLE_CHUNK_DEBUG
                        DWARN("connect chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                                        CHKID_ARG(chkid), network_rname(nid), ret, strerror(ret));
#else
                        DBUG("connect chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                                        CHKID_ARG(chkid), network_rname(nid), ret, strerror(ret));
#endif
                }

                if (ret == EKEYEXPIRED) {
                        goto retry;
                } else if (ret == EPERM) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }

#if ENABLE_CHUNK_DEBUG
        DWARN("connect chunk "CHKID_FORMAT" at %s, clock (%u) dirty:%d\n",
                        CHKID_ARG(chkid), network_rname(nid), clockstat->vclock.clock, clockstat->dirty);
#endif

        YASSERT(repstat);
        YASSERT(ltime);
        repstat->ltime = ltime;
        repstat->magic = magic;

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_connect");
        
        return 0;
err_ret:
        return ret;
}

int chunk_getclock(const nid_t *nid, const chkid_t *chkid, clockstat_t *clockstat)
{
        int ret;

        if (net_islocal(nid)) {
                ret = replica_srv_getclock(nid, chkid, clockstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_getclock(nid, chkid, clockstat);
                if (unlikely(ret)) {
                        DBUG("getclock chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                              CHKID_ARG(chkid),
                              network_rname(nid), ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

#if ENABLE_CHUNK_DEBUG
        DINFO("getclock chunk "CHKID_FORMAT" at %s clock %ju\n",
              CHKID_ARG(chkid), network_rname(nid), clockstat->vclock.clock);
#else
        DBUG("getclock chunk "CHKID_FORMAT" at %s clock %ju\n",
             CHKID_ARG(chkid), network_rname(nid), clockstat->vclock.clock);
#endif

        return 0;
err_ret:
        return ret;
}

int IO_FUNC chunk_replica_consistent(const chkid_t *chkid, const reploc_t *reploc, time_t _ltime, const vfm_t *vfm)
{
        int intact, reset;

#if ENABLE_VFM
        if (likely(chkid->type == __RAW_CHUNK__)) {
                YASSERT(vfm);
        }
#else        
        vfm = NULL;
#endif
        intact = chunk_replica_intact(chkid, reploc, _ltime, &reset);
        if (intact) {
                return 1;
        }

        if (reset == 0) {
                if (reploc->status == __S_DIRTY) { //recovery will be exec asynchronously
                        DBUG("chunk "CHKID_FORMAT" @ %s status %d, reset %u\n",
                             CHKID_ARG(chkid), network_rname(&reploc->id), reploc->status, reset);

                        return 1;
                }

                if (vfm_exist(vfm, &reploc->id)) {
                        char tmp[MAX_BUF_LEN];
                        vfm_dump(vfm, tmp);
                        DBUG("chunk "CHKID_FORMAT" @ %s status %d, vfm %s, reset %u\n",
                             CHKID_ARG(chkid), network_rname(&reploc->id), reploc->status, tmp, reset);

                        return 1;
                }
        }

        DBUG("chunk "CHKID_FORMAT" @ %s status %d, reset %u\n",
             CHKID_ARG(chkid), network_rname(&reploc->id), reploc->status, reset);

        return 0;
}

static int __chunk_replica_connected(const reploc_t *reploc, time_t _ltime)
{
        int ret, offline = 0, reset = 0;
        time_t ltime;

        ret = network_connect(&reploc->id, &ltime, 1, 0);
        if (unlikely(ret)) {
                offline = 1;
        } else {
                if (_ltime != ltime)
                        reset = 1;
        }

        return  !(offline || reset);
}

int chunk_proto_connected(const chkinfo_t *chkinfo, chkstat_t *chkstat, const vfm_t *vfm)
{
        int i, connected = 0;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (likely(__chunk_replica_connected(&chkinfo->diskid[i],
                                                     chkstat->repstat[i].ltime))) {
                        if (vfm_exist(vfm, &chkinfo->diskid[i].id)) {
                                chkstat->repstat[i].ltime = 0;
                        } else {
                                connected++;
                        }
                }
        }

        return connected;
}

int IO_FUNC chunk_replica_intact(const chkid_t *chkid, const reploc_t *reploc, time_t _ltime, int *_reset)
{
        int ret, offline = 0, reset = 0;
        time_t ltime = 0;

        ret = network_connect(&reploc->id, &ltime, 1, 0);
        if (unlikely(ret)) {
                offline = 1;
                ltime = 0;
        } else {
                if (_ltime != ltime) {//node online
                        reset = 1;
                }
        }

        if (_reset)
                *_reset = reset;
        
        if (likely(offline == 0 && reset == 0 && reploc->status == __S_CLEAN)) {
                DBUG("chunk "CHKID_FORMAT" @ %s offline %d reset %d (%lu %lu) status %d\n",
                     CHKID_ARG(chkid), network_rname(&reploc->id), offline, reset, _ltime, ltime, reploc->status);
                return 1;
        } else {
                DBUG("chunk "CHKID_FORMAT" @ %s offline %d reset %d (%lu %lu) status %d\n",
                     CHKID_ARG(chkid), network_rname(&reploc->id), offline, reset, _ltime, ltime, reploc->status);
                
                return 0;
        }
}

int IO_FUNC chunk_proto_consistent(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm)
{
        int i, consistent;

#if ENABLE_VFM
        if (likely(chkinfo->id.type == __RAW_CHUNK__)) {
                YASSERT(vfm);
        }
#else
        vfm = NULL;
#endif
        
        DBUG("chunk "CHKID_FORMAT" check\n", CHKID_ARG(&chkinfo->id));
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                consistent = chunk_replica_consistent(&chkinfo->id, &chkinfo->diskid[i],
                                                      chkstat->repstat[i].ltime, vfm);
                if (unlikely(!consistent)) {
                        return 0;
                }
        }

        return 1;
}

int chunk_proto_intact(const chkinfo_t *chkinfo, const chkstat_t *chkstat)
{
        int i, intact;

        intact = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (chunk_replica_intact(&chkinfo->id, &chkinfo->diskid[i],
                                         chkstat->repstat[i].ltime, NULL)) {
                        intact++;
                }
        }

        if (intact != chkinfo->repnum) {
#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" intact %u, repnum %u\n",
                      CHKID_ARG(&chkinfo->id), intact, chkinfo->repnum);
#else
                DBUG("chunk "CHKID_FORMAT" intact %u\n",
                      CHKID_ARG(&chkinfo->id), intact);
#endif
        }

        return chkinfo->repnum == intact;
}

int chunk_proto_unintact(const chkinfo_t *chkinfo, const chkstat_t *chkstat)
{
        int i, intact;

        intact = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (chunk_replica_intact(&chkinfo->id, &chkinfo->diskid[i],
                                         chkstat->repstat[i].ltime, NULL)) {
                        intact++;
                }
        }

        if (intact != chkinfo->repnum) {
#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" intact %u, repnum %u\n",
                      CHKID_ARG(&chkinfo->id), intact, chkinfo->repnum);
#else
                DBUG("chunk "CHKID_FORMAT" intact %u\n",
                      CHKID_ARG(&chkinfo->id), intact);
#endif
        }

        return chkinfo->repnum - intact;
}

void chunk_proto_reset(const chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int i;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                chkstat->repstat[i].ltime = 0;
        }
}

STATIC int __chunk_unlink(const nid_t *nid, const chkid_t *chkid, uint64_t meta_version)
{
        int ret;

        ret = network_connect1(nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(nid)) {
                ret = replica_srv_unlink(chkid, meta_version);
                if (unlikely(ret)) {
                        DWARN("unlink chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                              CHKID_ARG(chkid), network_rname(nid), ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        } else {
                ret = replica_rpc_unlink(nid, chkid, meta_version);
                if (unlikely(ret)) {
                        DWARN("unlink chunk "CHKID_FORMAT" at %s, ret (%u) %s\n",
                              CHKID_ARG(chkid), network_rname(nid), ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int chunk_proto_rep_unlink(const chkinfo_t *chkinfo, const chkstat_t *chkstat)
{
        int ret, i;

        (void) chkstat;

        CHKINFO_DUMP(chkinfo, D_INFO);

        //逆序删除
        for (i = (int)chkinfo->repnum - 1; i >= 0; i--) {
                ret = __chunk_unlink(&chkinfo->diskid[i].id, &chkinfo->id, chkinfo->info_version);
                if (unlikely(ret)) {
                        if (ret != ENOENT) {
                                GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

void chunk_proto_clock(const chkinfo_t *chkinfo, chkstat_t *chkstat, uint64_t *clock, int op)
{
        if (op == __OP_WRITE) {
                chkstat->chkstat_clock++;
        }

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk "CHKID_FORMAT" clock %ju, op %u\n", CHKID_ARG(&chkinfo->id),
             chkstat->chkstat_clock, op);
#else
        DBUG("chunk "CHKID_FORMAT" clock %ju, op %u\n", CHKID_ARG(&chkinfo->id),
             chkstat->chkstat_clock, op);
#endif

        *clock = chkstat->chkstat_clock;
}

STATIC void __chunk_local_write__(void *arg)
{
        int ret;
        chunk_local_write_ctx_t *ctx = arg;

        ANALYSIS_BEGIN(0);

        const nid_t *nid = net_getnid();
        ret = replica_srv_write(nid, &ctx->io, ctx->buf, ctx->magic);
        if (unlikely(ret)) {
                if (ret == EPERM || ret == EIO || ret == ENODEV) {
                        ret = EAGAIN;
                }

                network_ltime_reset(nid, ctx->ltime, "__chunk_local_write__");
        }

        ctx->retval = ret;

        sy_rwlock_unlock(&ctx->lock);

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_proto_local_write");
}

void chunk_local_write(chunk_local_write_ctx_t *ctx,
                                const io_t *io, const buffer_t *buf, uint32_t magic, uint32_t ltime)
{
        int ret;
        char *pname = NULL;
#if LOCK_DEBUG
        char lname[MAX_LOCK_NAME];

        pname = lname;
        sprintf(pname, "local_write."CHKID_FORMAT, CHKID_ARG(&io->id));
#endif

        ret = sy_rwlock_init(&ctx->lock, pname);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ctx->buf = buf;
        ctx->io = *io;
        ctx->magic = magic;
        ctx->ltime = ltime;

        ret = sy_rwlock_wrlock(&ctx->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        schedule_task_new("chunk_local_write", __chunk_local_write__, ctx, -1);
}

int chunk_local_write_wait(chunk_local_write_ctx_t *ctx)
{
        int ret;

        ret = sy_rwlock_wrlock(&ctx->lock);
        if (unlikely(ret)) {
                if (ret == EPERM || ret == ESTALE) {
                        EXIT(EAGAIN);
                } else {
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        ret = ctx->retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int chunk_proto_set(chkinfo_t *chkinfo, chkstat_t *chkstat, const nid_t *nid, int status, int *seted)
{
        int ret, i, found = -1, clean = 0;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (chkinfo->diskid[i].status == __S_CLEAN) {
                        clean++;
                }

                if (nid_cmp(&chkinfo->diskid[i].id, nid) == 0) {
                        found = i;
                }
        }

        if (!clean) {
                ret  = EAGAIN;
                GOTO(err_ret, ret);
        }

        if (found == -1) {
                ret  = ENOENT;
                GOTO(err_ret, ret);
        }


        ret = __chkinfo_set_status(chkinfo, found, status, seted);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkstat->repstat[found].ltime = 0;

        return 0;
err_ret:
        return ret;
}

static int __chunk_proto_setparent__(const nid_t *nid, const chkid_t *chkid, const chkid_t *parent)
{
        int ret;

        if (net_islocal(nid)) {
                ret = replica_srv_setparent(chkid, parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_setparent(nid, chkid, parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int chunk_proto_setparent(const chkinfo_t *chkinfo, const chkid_t *fileid)
{
        int ret, i;
        const nid_t *nid;

        for (i = 0; i < chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;

                ret = network_connect(nid, NULL, 1, 0);
                if (unlikely(ret)) {
                        DWARN("connect %s fail\n", network_rname(nid));
                        continue;
                }

                ret = __chunk_proto_setparent__(nid, &chkinfo->id, fileid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


void chkinfo_dump(const chkinfo_t *chkinfo, vfm_t *_vfm, char *buf, int _clock)
{
        int ret, i;
        const chkid_t *id = &chkinfo->id;
        char *chkinfo_str_buf;                                  
        const char *stat;                                       
        const reploc_t *diskid;
        volid_t volid, tabid;
        const vfm_t *vfm;
        static __thread chkid_t __tabid__ = {0, 0, 0};
        static __thread char __buf__[MAX_BUF_LEN];
        char clockdump[MAX_NAME_LEN];
        clockstat_t clockstat;

        if (id->type != __RAW_CHUNK__) {
                CHKINFO_STR(chkinfo, buf);
                return;
        }
        
        if (_vfm) {
                vfm = _vfm;
        } else {
                cid2tid(&tabid, id);
                vfm = (void *)__buf__;
                if (chkid_cmp(&tabid, &__tabid__) != 0) {
                        DBUG(""CHKID_FORMAT" load\n", CHKID_ARG(&tabid));

                        cid2fid(&volid, id);

                        ret = md_vfm_get(&volid, id, (void *)__buf__);
                        if (unlikely(ret)) {
                                CHKINFO_STR(chkinfo, buf);
                                return;
                        }

                        __tabid__ = tabid;
                } else {
                        DBUG(""CHKID_FORMAT" exist\n", CHKID_ARG(&tabid));
                }

                vfm = (void *)__buf__;
        }

        chkinfo_str_buf = __malloc(MAX_BUF_LEN);                
        chkinfo_str_buf[0] = '\0';                              
        for (i = 0; i < (int)chkinfo->repnum; ++i) {        
                diskid = &chkinfo->diskid[i];               
                if (ng.daemon) {                                
                        network_connect(&diskid->id, NULL, 0, 0); 
                } else {                                        
                        network_connect(&diskid->id, NULL, 1, 0); 
                }                                               
                                                                        
                if (vfm_exist(vfm, &diskid->id)) {              
                        stat = "vfm";                           
                } else if (diskid->status == __S_DIRTY) {       
                        stat = "dirty";                         
                } else if (diskid->status == __S_CHECK) {        
                        stat = "check";                         
                } else if (netable_connected(&diskid->id) == 0) { 
                        stat = "offline";                       
                } else {                                        
                        stat = "clean";                         
                }                                               

                if (_clock) {
                        ret = chunk_getclock(&diskid->id, &chkinfo->id, &clockstat);
                        if (ret) {
                                strcpy(clockdump, "null");
                        }

                        sprintf(clockdump, VCLOCK_FORMAT",%u,%u",
                                 VCLOCK_ARG(&clockstat.vclock),
                                 clockstat.dirty,
                                 clockstat.lost);
                } else {
                        strcpy(clockdump, "null");
                }
                
                snprintf(chkinfo_str_buf + strlen(chkinfo_str_buf), MAX_NAME_LEN, "%s:%s:%s ", 
                         network_rname(&diskid->id), stat, clockdump);
        }                                                       

        snprintf(buf, MAX_BUF_LEN, "chunk %s info_version %llu @ [%s]", 
                 id2str(&chkinfo->id), (LLU)chkinfo->info_version, chkinfo_str_buf); 
        __free(chkinfo_str_buf);                                
}

int chunk_proto_init()
{
        int ret;

        ret = chunk_bh_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_proto_rep_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_EC
        ret = chunk_proto_ec_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

struct chunk_proto_ops *chunk_ops[CHUNK_PROTO_MAX];

int chunk_proto_ops_register(struct chunk_proto_ops *hook, int type)
{
        YASSERT(type >= 0 && type < CHUNK_PROTO_MAX);
        chunk_ops[type] = hook;
        return 0;
}

#if 1

inline struct chunk_proto_ops *__attribute__((always_inline)) chunk_proto_ops_get(const ec_t *ec)
{
#if ENABLE_EC
        if (EC_ISEC(ec)) {
                return chunk_ops[CHUNK_PROTO_EC];
        } else {
                return chunk_ops[CHUNK_PROTO_REP];
        }
#else
        (void) ec;
        return chunk_ops[CHUNK_PROTO_REP];
#endif
}

#else

inline struct chunk_proto_ops *__attribute__((always_inline)) chunk_proto_ops_get(const chkid_t *chkid, const ec_t *ec)
{
#if ENABLE_EC
        if (EC_ISEC(ec) && !eclog_chunk_islog(chkid, ec)) {
                return chunk_ops[CHUNK_PROTO_EC];
        } else {
                return chunk_ops[CHUNK_PROTO_REP];
        }
#else
        (void) chkid;
        (void) ec;
        return chunk_ops[CHUNK_PROTO_REP];
#endif
}

#endif
